篇首语:本文由编程笔记#小编为大家整理,主要介绍了Spark MLlib算法调用展示平台及其实现过程相关的知识,希望对你有一定的参考价值。
1)配置好db.properties中相应用户名密码/数据库等参数;
2)第一次启动tomcat,修改hibernate.cfg.xml文件中的hibernate.hbm2ddl.auto值为create,第二次启动修改为update;
3) 打开集群参数页面,点击初始化,初始化集群参数,如果集群参数和当前集群不匹配,那么需要做相应修改;
暂时考虑使用配置文件的方式来配置集群参数,如果要调整为数据库配置,那么修改Utisl.dbOrFile参数即可;即,暂时只需修改utisl.properties文件;
4)拷贝Spark_MLlib_Algorithm_1.6.0工程生成的算法到到3)中spark.jar所在路径;
5)拷贝集群中的yarn-site.xml到3)中spark.files所在路径;
6)拷贝spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar到3)中spark.yarn.jar所在路径;
package com.fz.classification
在上面的代码中,有对每个参数的解释,包括参数的含义,参数有哪些参数等;
import com.fz.util.Utils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD, LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.SparkConf, SparkContext
/**
* 逻辑回归封装算法
* Labels used in Logistic Regression should be 0, 1, ..., k - 1 for k classes multi-label classification problem
* 输入参数:
* testOrNot : 是否是测试,正常情况设置为false
* input:输出数据;
* minPartitions : 输入数据最小partition个数
* output:输出路径
* targetIndex:目标列所在下标,从1开始
* splitter:数据分隔符;
* method:使用逻辑回归算法:"SGD" or "LBFGS"
* hasIntercept : 是否具有截距
* numClasses: 目标列类别个数;
* Created by fanzhe on 2016/12/19.
*/
object LogisticRegression
def main (args: Array[String])
if(args.length != 9)
println("Usage: com.fz.classification.LogisticRegression testOrNot input minPartitions output targetIndex " +
"splitter method hasIntercept numClasses")
System.exit(-1)
val testOrNot = args(0).toBoolean // 是否是测试,sparkContext获取方式不一样, true 为test
val input = args(1)
val minPartitions = args(2).toInt
val output = args(3)
val targetIndex = args(4).toInt // 从1开始,不是从0开始要注意
val splitter = args(5)
val method = args(6) //should be "SGD" or "LBFGS"
val hasIntercept = args(7).toBoolean
val numClasses = args(8).toInt
val sc = Utils.getSparkContext(testOrNot,"Logistic Create Model")
// construct data
// Load and parse the data
val training = Utils.getLabeledPointData(sc,input,minPartitions,splitter,targetIndex).cache()
// Run training algorithm to build the model
val model = method match
case "SGD" => new LogisticRegressionWithSGD()
.setIntercept(hasIntercept)
.run(training)
case "LBFGS" => new LogisticRegressionWithLBFGS().setNumClasses(numClasses)
.setIntercept(hasIntercept)
.run(training)
case _ => throw new RuntimeException("no method")
// save model
model.save(sc,output)
sc.stop()
在Main函数中,首先对各个参数进行获取并赋值变量,接着就是获取SparkContext;
其中,最重要的部分就是调用Spark自己封装的LogisticRegressionWithSGD 或 LogisticRegressionWithBFGS类进行逻辑回归建模;
最后,调用模型的save方法,把模型固化到HDFS上;
基本,所有的算法封装都采取这种模式,及对Spark MLlib中原生的算法再加一层封装。
2. 测试
测试主要使用JUnit进行测试,其逻辑回归示例代码如下:
代码清单3-2 逻辑回归算法封装测试(Scala)
package com.fz.classification
这里面的方法都是第一步先构建算法参数;接着调用main方法;第三步,查看输出中是否具有模型的相关信息;
import java.io.File
import com.fz.util.Utils
import org.junit.Assert, Test
import Assert._
/**
* 测试Logistics Regression算法
* Created by fanzhe on 2016/12/19.
*/
@Test
class LogisticRegressionTest
@Test
def testMain1()=
// testOrNot input output targetIndex splitter method hasIntercept numClasses
val args = Array(
"true",
"./src/data/classification_regression/logistic.dat",
"2",
"./target/logistic/tmp1",
"1",
" ",
"SGD",
"true",
"2" // this parameter is useless
)
// 删除输出目录
Utils.deleteOutput(args(3))
LogisticRegression.main(args)
assertTrue(Utils.fileContainsClassName(args(3)+"/metadata/part-00000",
"org.apache.spark.mllib.classification.LogisticRegressionModel"))
@Test
def testMain2()=
// testOrNot input minPartitions output targetIndex splitter method hasIntercept numClasses
val args = Array(
"true",
"./src/data/classification_regression/logistic.dat",
"2",
"./target/logistic/tmp2",
"1",
" ",
"LBFGS",
"true",
"2"
)
// 删除输出目录
Utils.deleteOutput(args(3))
LogisticRegression.main(args)
assertTrue(Utils.fileContainsClassName(args(3)+"/metadata/part-00000",
"org.apache.spark.mllib.classification.LogisticRegressionModel"))
当然,这里面还可以添加多个测试方法,使用不同的算法参数或数据进行测试;(读者可自行添加)
1) 获取JobInfo中最新的records条记录;
其代码如下所示:
2) 查找其中isFinished字段为false的数据;
3) 根据2)中查找的数据,去YARN获取其实时状态,并更新1)中的数据,然后存入数据库中;
4) 根据row和page字段分页返回JSON数据;
代码清单3-3 更新监控任务列表
public void getJobInfo()
第一步通过dBService获取给定records个记录;第二步则更新这些记录;看下HUtils.updateJobInfo的实现:
Map
// 1.
List jobInfos = dBService.getLastNRows("JobInfo","jobId",true,records);
// 2,3
List list = null;
try
list = HUtils.updateJobInfo(jobInfos);
if(list != null || list.size()>0)
dBService.updateTableData(list);
catch (Exception e)
e.printStackTrace();
log.warn("更新任务状态异常!");
jsonMap.put("total", 0);
jsonMap.put("rows", null);
Utils.write2PrintWriter(JSON.toJSONString(jsonMap));
return ;
// 4.
jsonMap.put("total",list.size());
jsonMap.put("rows",Utils.getSubList(list,page,rows));
Utils.write2PrintWriter(JSON.toJSONString(jsonMap));
代码清单3-4 获取任务最新状态
public static List updateJobInfo(List jobInfos)throws YarnException,IOException
这里的工作就是根据数据库中任务的状态,只查询任务没有完成的任务的最新状态,并更新原始任务状态,最后把更新后的或者原始任务添加到list中,并返回;
List list &#61; new ArrayList<>();
JobInfo jobInfo;
for(Object o :jobInfos)
jobInfo &#61; (JobInfo) o;
if(!jobInfo.isFinished()) // 如果没有完成&#xff0c;则检查其最新状态
ApplicationReport appReport&#61;null;
try
appReport &#61; getClient().getApplicationReport(SparkUtils.getAppId(jobInfo.getJobId()));
catch (YarnException | IOException e)
e.printStackTrace();
throw e;
/**
* NEW, 0
NEW_SAVING, 1
SUBMITTED, 2
ACCEPTED, 3
RUNNING, 4
FINISHED, 5
FAILED, 6
KILLED; 7
*/
switch (appReport.getYarnApplicationState().ordinal())
case 0 | 1 | 2 |3 : // 都更新为Accepted状态
jobInfo.setRunState(JobState.ACCETPED);
break;
case 4 :
jobInfo.setRunState(JobState.RUNNING);break;
case 5:
// UNDEFINED,
// SUCCEEDED,
// FAILED,
// KILLED;
switch (appReport.getFinalApplicationStatus().ordinal())
case 1: jobInfo.setRunState(JobState.SUCCESSED);
SparkUtils.cleanupStagingDir(jobInfo.getJobId());
jobInfo.setFinished(true);break;
case 2:
jobInfo.setRunState(JobState.FAILED);
SparkUtils.cleanupStagingDir(jobInfo.getJobId());
jobInfo.setErrorInfo(appReport.getDiagnostics().substring(0,Utils.EXCEPTIONMESSAGELENGTH));
jobInfo.setFinished(true);break;
case 3:
jobInfo.setRunState(JobState.KILLED);
SparkUtils.cleanupStagingDir(jobInfo.getJobId());
jobInfo.setFinished(true);break;
default: log.warn("App:" &#43; jobInfo.getJobId() &#43; "获取任务状态异常! " &#43;
"appReport.getFinalApplicationStatus():"&#43;appReport.getFinalApplicationStatus().name()
&#43;",ordinal:"&#43; appReport.getFinalApplicationStatus().ordinal());
break;
case 6:
jobInfo.setRunState(JobState.FAILED);
SparkUtils.cleanupStagingDir(jobInfo.getJobId());
jobInfo.setErrorInfo(appReport.getDiagnostics().substring(0,Utils.EXCEPTIONMESSAGELENGTH));
jobInfo.setFinished(true);break;
case 7:
jobInfo.setRunState(JobState.KILLED);
SparkUtils.cleanupStagingDir(jobInfo.getJobId());
jobInfo.setFinished(true);break;
default: log.warn("App:" &#43; jobInfo.getJobId() &#43; "获取任务状态异常!"&#43;
"appReport.getYarnApplicationState():"&#43;appReport.getYarnApplicationState().name()
&#43;",ordinal:"&#43; appReport.getYarnApplicationState().ordinal());
jobInfo.setModifiedTime(new Date());
list.add(jobInfo);// 把更新后的或原始的JobInfo添加到list中
return list;
在代码清单3-3中&#xff0c;返回更新后的list后&#xff0c;接着调用了DBService.updateTableData,对数据进行固化&#xff1b;最后&#xff0c;使用subList对list进行截取&#xff0c;返回给前台某个分页的数据。
1&#xff09;编写src/main/java/下算法对应的Thread&#xff1b;
2&#xff09;编写webapp下的对应页面&#xff1b;
3&#xff09;编写webapp/js下对应的js&#xff1b;
4&#xff09;修改webapp/preprocess/upload.jsp&#xff0c;添加一条数据上传记录&#xff0c;并在main/data下添加对应的数据&#xff1b;
5&#xff09;启动工程&#xff0c;在页面上传数据&#xff0c;然后选择算法&#xff0c;设置参数&#xff0c;即可提交任务&#xff0c;提交任务后在监控界面即可看到算法运行状态&#xff1b;
工程状态&#xff08;假设Scala工程为工程1&#xff0c;调用Spark算法工程为工程2&#xff09;&#xff1a;
工程1&#xff1a;
基本封装了Spark Mllib中的数据挖掘相关算法&#xff0c;包括聚类、分类、回归、协同过滤、降维、频繁集挖掘&#xff08;这个还有点问题&#xff09;&#xff1b;
工程2&#xff1a;
目前只做了分类和回归算法的相关页面以及调用&#xff1b;
所以&#xff0c;如果你要在这个版本上开发&#xff0c;那么可以参考上面的流程先试着编写ALS算法的调用即可。